package gu.simplemq.stomp;

import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;

import org.projectodd.stilts.stomp.StompException;
import org.projectodd.stilts.stomp.StompMessage;
import org.projectodd.stilts.stomp.Subscription.AckMode;
import org.projectodd.stilts.stomp.client.ClientSubscription;
import org.projectodd.stilts.stomp.client.MessageHandler;
import org.projectodd.stilts.stomp.client.StompClient;
import org.projectodd.stilts.stomp.client.SubscriptionBuilder;

import com.google.common.collect.Maps;

import gu.simplemq.BaseMQDispatcher;
import gu.simplemq.utils.URISupport;

/**
 * STOMP分发器实现<br>
 * @author guyadong
 *
 */
class BaseStompDispatcher extends BaseMQDispatcher<StompClient>implements MessageHandler  {
	private ConcurrentMap<String, ClientSubscription> subscriptions = Maps.newConcurrentMap();
	private final String destPrefix;//="/topic/"; 
	BaseStompDispatcher(StompPoolLazy pool,String destPrefix) {
		super(pool);
		this.destPrefix = destPrefix;
	}

	@Override
	protected void doInit() throws Exception {
		// DO NOTHING
	}

	@Override
	protected void doSub(String channel) throws Exception {
		SubscriptionBuilder builder = getConnection().subscribe( destPrefix + channel )
        .withMessageHandler( this )
        .withAckMode( AckMode.CLIENT_INDIVIDUAL );
		for(Entry<String, String> entry:((StompPoolLazy)pool).getHeaders().entrySet()){
			builder.withHeader(entry.getKey(), entry.getValue());
		}
		subscriptions.put(channel,builder.start());
	}

	@Override
	protected void doUnsub(String channel) throws StompException {
		ClientSubscription sub = subscriptions.remove(channel);
		if(sub != null && sub.isActive() && getConnection().isConnected()){
			sub.unsubscribe();
		}
	}

	@Override
	public void handle(StompMessage message) {
		String text = message.getContentAsString();
		String topic = URISupport.stripPrefix(message.getDestination(),destPrefix);
		this.dispatch(topic, text);
	}

}
